{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "9a80963f-97d4-4571-a015-5fb40b912118",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# 共享变量"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "5078e572-59f1-427b-9111-781a775fd59a",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 广播变量"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "329b9c23-ba74-4955-929a-895d23221c5b",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "from pyspark.taskcontext import TaskContext\n",
    "import socket\n",
    "import threading\n",
    "\n",
    "rdd = sc.parallelize(range(1,1001),10)\n",
    "\n",
    "cst = \"a\"\n",
    "lst = [\"a\",\"b\",\"c\"].copy()\n",
    "\n",
    "# Driver内的\n",
    "print([(socket.gethostbyname(socket.gethostname()) + \"__\" + str(threading.currentThread().ident) + \"__\" + str(id(lst)))])\n",
    "\n",
    "# Executor内的\n",
    "# 同一个Executor内的不同分区，常量是共享的\n",
    "print(rdd.map(lambda x: (socket.gethostbyname(socket.gethostname()) + \"__\" + str(threading.currentThread().ident) + \"__\" + str(id(cst)))).distinct().collect())\n",
    "# 同一个Executor内的不同分区，常量是共享的\n",
    "print(rdd.map(lambda x: (socket.gethostbyname(socket.gethostname()) + \"__\" + str(threading.currentThread().ident) + \"__\" + str(id(lst[0])))).distinct().collect())\n",
    "# 同一个Executor内的不同分区，发送的对象可能会存在多份相同的数据\n",
    "print(rdd.map(lambda x: (socket.gethostbyname(socket.gethostname()) + \"__\" + str(threading.currentThread().ident) + \"__\" + str(id(lst)))).distinct().collect())\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "62f99ff0-e695-439c-a746-7efd8f95ca38",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "从上面的案例我们可以看出，本地list对象被发送到每个分区的处理线程上使用，也就是executor内（即便是在单机环境，也会发送到对应的线程上），可能会存放多份一样的数据。\n",
    "\n",
    "executor是进程，进程内的资源可以共享的，这多份一样的数据就没有必要了，它造成了内存的浪费。\n",
    "\n",
    "**解决方案 -- 广播变量**\n",
    "\n",
    "如果将本地list对象标记为广播变量对象，那么当上述场景出现时，Spark只会：\n",
    "* 给每个executor一份数据，以节省内存。而不是像上面那样每个分区的处理线程都放一份。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "175b1946-11db-4c43-883e-0ca9f8c3558f",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "from pyspark.taskcontext import TaskContext\n",
    "import socket\n",
    "import threading\n",
    "\n",
    "rdd = sc.parallelize(range(1,1001),10)\n",
    "\n",
    "cst = \"a\"\n",
    "lst = sc.broadcast([\"a\",\"b\",\"c\"].copy())\n",
    "\n",
    "# Driver内的\n",
    "print([(socket.gethostbyname(socket.gethostname()) + \"__\" + str(threading.currentThread().ident) + \"__\" + str(id(lst)))])\n",
    "\n",
    "# Executor内的\n",
    "# 同一个Executor内的不同分区，常量是共享的\n",
    "print(rdd.map(lambda x: (socket.gethostbyname(socket.gethostname()) + \"__\" + str(threading.currentThread().ident) + \"__\" + str(id(cst)))).distinct().collect())\n",
    "# 同一个Executor内的不同分区，常量是共享的\n",
    "print(rdd.map(lambda x: (socket.gethostbyname(socket.gethostname()) + \"__\" + str(threading.currentThread().ident) + \"__\" + str(id(lst.value[0])))).distinct().collect())\n",
    "# 同一个Executor内的不同分区，发送的对象可能会存在多份相同的数据\n",
    "print(rdd.map(lambda x: (socket.gethostbyname(socket.gethostname()) + \"__\" + str(threading.currentThread().ident) + \"__\" + str(id(lst.value)))).distinct().collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "a8d0d312-391f-411e-ae6b-3d820d2f845f",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 累加器"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "d955ef6a-eff1-405f-9b99-d83044b2faa8",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)\n",
    "\n",
    "totals = 10\n",
    "\n",
    "def map_func(data):\n",
    "    global totals\n",
    "    totals += 1\n",
    "    return (data, totals)\n",
    "\n",
    "print(rdd.map(map_func).glom().collect())\n",
    "print(totals)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "15f49f38-b3be-473a-a1e8-d9b0d5db3389",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "从上面的例子我们看出，totals在Driver中初始化，并且在Executor中需要的时候会从Driver发送到Executor。\n",
    "\n",
    "但是在Executor中运算完成后，Executor中的totals无论变成多少，都不会影响Driver上的totals的值。\n",
    "\n",
    "我们有时候希望在Executor上运行的是统计逻辑，最终的统计结果会在Driver上也体现出来。\n",
    "\n",
    "**解决方案 -- 累加器**\n",
    "\n",
    "累加器对象，构建方式：sc.accumulator(初始值)\n",
    "\n",
    "这个对象唯一不同的是：这个对象可以从各个Executor中收集他们的执行结果，作用回自己身上。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "c3ead3ed-d646-4361-86a0-1d6f8d0a0de9",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)\n",
    "\n",
    "totals = sc.accumulator(10)\n",
    "\n",
    "def map_func(data):\n",
    "    global totals\n",
    "    totals += 1\n",
    "    return (data, totals)\n",
    "\n",
    "print(rdd.map(map_func).glom().collect())\n",
    "print(totals.value)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "b7ce0161-5156-4524-96ce-256309e23908",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "### 注意事项\n",
    "\n",
    "因为RDD是过程数据，如果RDD上执行多次Action，那么RDD可能会构建多次。\n",
    "\n",
    "如果累加器累加代码存在于重新构建的步骤中，累加器累加代码就可能被多次执行。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "7b3682e8-fcab-4d9a-80f2-481887fe1d52",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)\n",
    "\n",
    "totals = sc.accumulator(10)\n",
    "\n",
    "def map_func(data):\n",
    "    global totals\n",
    "    totals += 1\n",
    "    return (data, totals)\n",
    "\n",
    "rdd2 = rdd.map(map_func)\n",
    "print(rdd2.glom().collect())\n",
    "print(rdd2.glom().collect())\n",
    "print(rdd2.glom().collect())\n",
    "print(totals.value)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "e01e159d-dddd-4dd1-99b8-80fd1a1b4163",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "解决这个问题，可以使用缓存或者CheckPoint。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "a5e3f9ef-4bb6-4adc-a22d-057c515319a3",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)\n",
    "\n",
    "totals = sc.accumulator(10)\n",
    "\n",
    "def map_func(data):\n",
    "    global totals\n",
    "    totals += 1\n",
    "    return (data, totals)\n",
    "\n",
    "rdd2 = rdd.map(map_func)\n",
    "rdd2.cache()\n",
    "\n",
    "print(rdd2.glom().collect())\n",
    "print(rdd2.glom().collect())\n",
    "print(rdd2.glom().collect())\n",
    "print(totals.value)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "0574b23c-edd8-4765-a610-b4529437382b",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 案例\n",
    "\n",
    "从订单列表中统计 \"Plain Papadum\",\"Butter Chicken\",\"Bengal Fry Fish\" 这三样产品的销售总数。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "f332241a-77a1-4c27-8d31-e5e1c84bc60a",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "import datetime\n",
    "\n",
    "# 读取数据文件\n",
    "fileRdd = sc.textFile(\"/mnt/databrickscontainer1/restaurant-1-orders.csv\",4)\n",
    "\n",
    "# 广播变量，减少内存中相同数据的份数\n",
    "items = sc.broadcast([\"Plain Papadum\",\"Butter Chicken\",\"Bengal Fry Fish\"])\n",
    "# 累加器\n",
    "totals = sc.accumulator(0)\n",
    "\n",
    "# 将订单中的标题行去掉\n",
    "dataRdd = fileRdd.map(lambda x: x.split(\",\")).filter(lambda x: x[0] != \"Order Number\")\n",
    "\n",
    "# print(dataRdd.map(lambda x:x[2]).distinct().collect())\n",
    "# print(dataRdd.map(lambda x: x[2] in items.value).collect())\n",
    "\n",
    "# 使用广播变量，过滤出列表中列出的产品及销售数量\n",
    "saleRdd = dataRdd.filter(lambda x: x[2] in items.value).map(lambda x: (x[2],int(x[3])))\n",
    "\n",
    "# 仅保留销售数量\n",
    "quantityRdd = saleRdd.map(lambda x: x[1])\n",
    "\n",
    "# 缓存一下\n",
    "quantityRdd.cache()\n",
    "\n",
    "# 使用reduce计算数量\n",
    "print(quantityRdd.reduce(lambda a,b: a + b))\n",
    "# 使用fold计算数量\n",
    "print(quantityRdd.fold(0, lambda a,b: a + b))\n",
    "# 使用累加器计算数量\n",
    "print(totals)\n",
    "def sum_func(data):\n",
    "    global totals\n",
    "    totals += data\n",
    "\n",
    "quantityRdd.foreach(sum_func)\n",
    "print(totals)"
   ]
  }
 ],
 "metadata": {
  "application/vnd.databricks.v1+notebook": {
   "dashboards": [],
   "language": "python",
   "notebookMetadata": {},
   "notebookName": "共享变量",
   "notebookOrigID": 2180139033493837,
   "widgets": {}
  },
  "kernelspec": {
   "display_name": "",
   "name": ""
  },
  "language_info": {
   "name": ""
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
